package org.xqh.study.mq.rocketmq.rocketmqclient;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.xqh.study.mq.rocketmq.RocketMQConst;

import java.util.List;

/**
 * @ClassName RocketMQConsumer
 * @Description TODO
 * @Author xuqianghui
 * @Date 2023/10/26 19:27
 * @Version 1.0
 */
public class RocketMQConsumer {

    public static void main(String[] args) throws MQClientException {
        // 构造Consumer时，必须指定groupId
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");
        consumer.setNamesrvAddr(RocketMQConst.endpoints); // nameServer地址,用于获取broker、topic信息
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 指定订阅的主题与tag，通过tag可以定制性消费（*表示全部tag）
        consumer.subscribe(RocketMQConst.testTopic, "*");

        // 异步消费
        consumer.registerMessageListener(new
             MessageListenerConcurrently() {
                 @Override
                 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//                System.out.println("Receive Message:" + msgs.toString());
                     // 1 try catch(throwable)确保不会因为业务逻辑的异常，导致消息出现重复消费的现象
                     // 2 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest.run()中会对Throwable进行捕获，
                     //   并且返回ConsumeConcurrentlyStatus.RECONSUME_LATER
                     try {
                         for (MessageExt msg : msgs) {
                             String msgbody = new String(msg.getBody(), "utf-8");
                             System.out.println(" MessageBody: " + msgbody);//输出消息内容
                         }
                     } catch (Exception e) {
                         e.printStackTrace();
                         return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
                     }

                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 签收
                 }
             });
        //启动消费者
        consumer.start();
        System.out.println("消费者启动成功。。。");
    }
}
